作为分布式文件系统,HDFS擅于处理大文件的读/写。这得益于“文件元信息与文件数据分离,文件数据分块存储”的思想:namenode管理文件元信息,datanode管理分块的文件数据。
HDFS 2.x进一步将数据块存储服务抽象为blockpool,不过写数据块过程与1.x大同小异。本文假设副本系数1(即写数据块只涉及1个客户端+1个datanode),未发生任何异常,分析datanode写数据块的过程。
源码版本:Apache Hadoop 2.6.0
可参考猴子追源码时的速记打断点,亲自debug一遍。
副本系数1,即只需要一个datanode构成最小的管道,与更常见的管道写相比,可以认为“无管道”。后续再写两篇文章分别分析管道写无异常、管道写有异常两种情况。
开始之前
总览
参考源码|HDFS之DataNode:启动过程,我们大体了解了datanode上有哪些重要的工作线程。其中,与写数据块过程联系最紧密的是DataXceiverServer与BPServiceActor。
参考HDFS-1.x、2.x的RPC接口,客户端与数据节点间主要通过流接口DataTransferProtocol完成数据块的读/写。DataTransferProtocol用于整个管道中的客户端、数据节点间的流式通信,其中,DataTransferProtocol#writeBlock()负责完成写数据块的工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void writeBlock(final ExtendedBlock blk, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, final DataChecksum requestedChecksum, final CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException;
|
文章的组织结构
- 如果只涉及单个分支的分析,则放在同一节。
- 如果涉及多个分支的分析,则在下一级分多个节,每节讨论一个分支。
- 多线程的分析同多分支。
- 每一个分支和线程的组织结构遵循规则1-3。
DataXceiverServer线程
注意,DataTransferProtocol并不是一个RPC协议,因此,常见通过的寻找DataTransferProtocol接口的实现类来确定“客户端调用的远程方法”是站不住脚。不过依然可以按照这个思路倒追,看实现类究竟是如何被创建,与谁通信,来验证是否找到了正确的实现类。
依靠debug,猴子从DataXceiver类反向追到了DataXceiverServer类。这里从DataXceiverServer类开始,正向讲解。
DataXceiverServer线程在DataNode#runDatanodeDaemon()方法中启动。
DataXceiverServer#run():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public void run() { Peer peer = null; while (datanode.shouldRun && !datanode.shutdownForUpgrade) { try { peer = peerServer.accept(); ... new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this)) .start(); } catch (SocketTimeoutException ignored) { } catch (AsynchronousCloseException ace) { if (datanode.shouldRun && !datanode.shutdownForUpgrade) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { ... } catch (OutOfMemoryError ie) { ... } catch (Throwable te) { LOG.error(datanode.getDisplayName() + ":DataXceiverServer: Exiting due to: ", te); datanode.shouldRun = false; } } ... }
|
DataXceiverServer线程是一个典型的Tcp Socket Server。客户端每来一个TCP请求,如果datanode上的DataXceiver线程数量还没超过限制,就启动一个新的DataXceiver线程。
默认的最大DataXceiver线程数量为4096,通过dfs.datanode.max.transfer.threads
设置。
主流程:DataXceiver线程
DataXceiver#run():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public void run() { int opsProcessed = 0; Op op = null; try { ... do { updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1)); try { ... op = readOp(); } catch (InterruptedIOException ignored) { break; } catch (IOException err) { ... ... } ... opStartTime = now(); processOp(op); ++opsProcessed; } while ((peer != null) && (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) { ... } finally { ... } }
|
此处的优化不多讲。
DataXceiver#readOp()继承自Receiver类:从客户端发来的socket中读取op码,判断客户端要进行何种操作操作。写数据块使用的op码为80,返回的枚举变量op = Op.WRITE_BLOCK
。
DataXceiver#processOp()也继承自Receiver类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| protected final void processOp(Op op) throws IOException { switch(op) { case READ_BLOCK: opReadBlock(); break; case WRITE_BLOCK: opWriteBlock(in); break; ... default: throw new IOException("Unknown op " + op + " in data stream"); } } ... private void opWriteBlock(DataInputStream in) throws IOException { final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in)); final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList()); TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); try { writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), PBHelper.convertStorageType(proto.getStorageType()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), proto.getHeader().getClientName(), targets, PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length), PBHelper.convert(proto.getSource()), fromProto(proto.getStage()), proto.getPipelineSize(), proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(), proto.getLatestGenerationStamp(), fromProto(proto.getRequestedChecksum()), (proto.hasCachingStrategy() ? getCachingStrategy(proto.getCachingStrategy()) : CachingStrategy.newDefaultStrategy()), (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false)); } finally { if (traceScope != null) traceScope.close(); } }
|
HDFS 2.x相对于1.x的另一项改进,在流式接口中也大幅替换为使用protobuf,不再是裸TCP分析字节流了。
Receiver类实现了DataTransferProtocol接口,但没有实现DataTransferProtocol#writeBlock()。多态特性告诉我们,这里会调用DataXceiver#writeBlock()。
终于回到了DataXceiver#writeBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| public void writeBlock(final ExtendedBlock block, final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, final long minBytesRcvd, final long maxBytesRcvd, final long latestGenerationStamp, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException { ... ... ... try { if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy, allowLazyPersist); storageUuid = blockReceiver.getStorageUuid(); } else { ... } ... if (isClient && !isTransfer) { if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { LOG.info("Datanode " + targets.length + " forwarding connect ack to upstream firstbadlink is " + firstBadLink); } BlockOpResponseProto.newBuilder() .setStatus(mirrorInStatus) .setFirstBadLink(firstBadLink) .build() .writeDelimitedTo(replyOut); replyOut.flush(); } if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, mirrorAddr, null, targets, false); ... } ... ... } catch (IOException ioe) { LOG.info("opWriteBlock " + block + " received exception " + ioe); throw ioe; } finally { ... } ... }
|
特别说明几个参数:
- stage:表示数据块构建的状态。此处为
BlockConstructionStage.PIPELINE_SETUP_CREATE
。
- isDatanode:表示写数据块请求是否由数据节点发起。如果写请求中clientname为空,就说明是由数据节点发起(如数据块复制等由数据节点发起)。此处为false。
- isClient:表示写数据块请求是否由客户端发起,此值一定与isDatanode相反。此处为true。
- isTransfers:表示写数据块请求是否为数据块复制。如果stage为
BlockConstructionStage.TRANSFER_RBW
或BlockConstructionStage.TRANSFER_FINALIZED
,则表示为了数据块复制。此处为false。
下面讨论“准备接收数据块”和“接收数据块”两个过程。
准备接收数据块:BlockReceiver.<init>()
BlockReceiver.<init>()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, final String clientname, final DatanodeInfo srcDataNode, final DataNode datanode, DataChecksum requestedChecksum, CachingStrategy cachingStrategy, final boolean allowLazyPersist) throws IOException { try{ ... if (isDatanode) { replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; ... default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); } } ... final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; streams = replicaInfo.createStreams(isCreate, requestedChecksum); assert streams != null : "null streams!"; ... if (isCreate) { BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); } } catch (ReplicaAlreadyExistsException bae) { throw bae; } catch (ReplicaNotFoundException bne) { throw bne; } catch(IOException ioe) { ... } }
|
尽管上述代码的注释加了不少,但创建block的场景比较简单,只需要记住在rbw目录下创建block文件和meta文件即可。
在rbw目录下创建数据块后,还要通过DataNode#notifyNamenodeReceivingBlock()向namenode汇报正在接收的数据块。该方法仅仅将数据块放入缓冲区中,由BPServiceActor线程异步汇报。
此处不展开,后面会介绍一个相似的方法DataNode#notifyNamenodeReceivedBlock()。
接收数据块:BlockReceiver#receiveBlock()
BlockReceiver#receiveBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams, boolean isReplaceBlock) throws IOException { ... try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams)); responder.start(); } while (receivePacket() >= 0) {} if (responder != null) { ((PacketResponder)responder.getRunnable()).close(); responderClosed = true; } ... } catch (IOException ioe) { if (datanode.isRestarting()) { LOG.info("Shutting down for restart (" + block + ")."); } else { LOG.info("Exception for " + block, ioe); throw ioe; } } finally { ... } }
|
同步接收packet:BlockReceiver#receivePacket()
先看BlockReceiver#receivePacket()。
严格来说,BlockReceiver#receivePacket()负责接收上游的packet,并继续向下游节点管道写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| private int receivePacket() throws IOException { packetReceiver.receiveNextPacket(in); PacketHeader header = packetReceiver.getHeader(); ... ... long offsetInBlock = header.getOffsetInBlock(); long seqno = header.getSeqno(); boolean lastPacketInBlock = header.isLastPacketInBlock(); final int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); ... if (responder != null && !syncBlock && !shouldVerifyChecksum()) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, Status.SUCCESS); } ... ByteBuffer dataBuf = packetReceiver.getDataSlice(); ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); if (lastPacketInBlock || len == 0) { if (syncBlock) { flushOrSync(true); } } else { final int checksumLen = diskChecksum.getChecksumSize(len); final int checksumReceivedLen = checksumBuf.capacity(); ... ... final boolean shouldNotWriteChecksum = checksumReceivedLen == 0 && streams.isTransientStorage(); try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen<offsetInBlock) { ... int startByteToDisk = (int)(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position(); int numBytesToDisk = (int)(offsetInBlock-onDiskLen); out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); final byte[] lastCrc; if (shouldNotWriteChecksum) { lastCrc = null; } else if (partialCrc != null) { ... ... checksumOut.write(buf); partialCrc = null; } else { ... checksumOut.write(checksumBuf.array(), offset, checksumLen); } ... } } catch (IOException iex) { datanode.checkDiskErrorAsync(); throw iex; } } if (responder != null && (syncBlock || shouldVerifyChecksum())) { ((PacketResponder) responder.getRunnable()).enqueue(seqno, lastPacketInBlock, offsetInBlock, Status.SUCCESS); } ... ... return lastPacketInBlock?-1:len; } ... private boolean shouldVerifyChecksum() { return (mirrorOut == null || isDatanode || needsChecksumTranslation); }
|
BlockReceiver#shouldVerifyChecksum()主要与管道写有关,本文只有一个datanode,则一定满足mirrorOut == null
。
上述代码看起来长,主要工作只有四项:
- 接收packet
- 校验packet
- 持久化packet
- 委托PacketResponder线程发送ack
BlockReceiver#receivePacket() + PacketResponder线程 + PacketResponder#ackQueue构成一个生产者消费者模型。生产和消费的对象是ack,BlockReceiver#receivePacket()是生产者,PacketResponder线程是消费者。
扫一眼PacketResponder#enqueue():
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock, final Status ackStatus) { final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock, System.nanoTime(), ackStatus); if(LOG.isDebugEnabled()) { LOG.debug(myString + ": enqueue " + p); } synchronized(ackQueue) { if (running) { ackQueue.addLast(p); ackQueue.notifyAll(); } } }
|
ackQueue是一个线程不安全的LinkedList。
关于如何利用线程不安全的容器实现生产者消费者模型可参考Java实现生产者-消费者模型中的实现三。
异步发送ack:PacketResponder线程
与BlockReceiver#receivePacket()相对,PacketResponder线程负责接收下游节点的ack,并继续向上游管道响应。
PacketResponder#run():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| public void run() { boolean lastPacketInBlock = false; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0; boolean isInterrupted = false; try { Packet pkt = null; long expected = -2; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { ack.readFields(downstreamIn); ... ... seqno = ack.getSeqno(); } if (seqno != PipelineAck.UNKOWN_SEQNO || type == PacketResponderType.LAST_IN_PIPELINE) { pkt = waitForAckHead(seqno); if (!isRunning()) { break; } expected = pkt.seqno; if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) { throw new IOException(myString + "seqno: expected=" + expected + ", received=" + seqno); } ... lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) { ... } catch (IOException ioe) { ... } ... if (lastPacketInBlock) { finalizeBlock(startTime); } sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0), (pkt != null ? pkt.ackStatus : Status.SUCCESS)); if (pkt != null) { removeAckHead(); } } catch (IOException e) { ... } catch (Throwable e) { ... } } LOG.info(myString + " terminating"); }
|
总结起来,PacketResponder线程的核心工作如下:
- 接收下游节点的ack
- 比较ack.seqno与当前队头的pkt.seqno
- 如果相等,则向上游发送pkt
- 如果是最后一个packet,将block的状态转换为FINALIZED
一不小心把管道响应的逻辑也分析了。。。
扫一眼PacketResponder线程使用的出队和查看对头的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| Packet waitForAckHead(long seqno) throws InterruptedException { synchronized(ackQueue) { while (isRunning() && ackQueue.size() == 0) { if (LOG.isDebugEnabled()) { LOG.debug(myString + ": seqno=" + seqno + " waiting for local datanode to finish write."); } ackQueue.wait(); } return isRunning() ? ackQueue.getFirst() : null; } } ... private void removeAckHead() { synchronized(ackQueue) { ackQueue.removeFirst(); ackQueue.notifyAll(); } }
|
队尾入队,队头出队。
- 每次查看对头后,如果发现队列非空,则只要不出队,则队列后续状态一定是非空的,且队头元素不变。
- 查看队头后的第一次出队,弹出的一定是刚才查看队头看到的元素。
需要看下PacketResponder#finalizeBlock():
1 2 3 4 5 6 7 8 9 10 11 12
| private void finalizeBlock(long startTime) throws IOException { BlockReceiver.this.close(); ... block.setNumBytes(replicaInfo.getNumBytes()); datanode.data.finalizeBlock(block); datanode.closeBlock( block, DataNode.EMPTY_DEL_HINT, replicaInfo.getStorageUuid()); ... }
|
datanode角度的数据块关闭:FsDatasetImpl#finalizeBlock()
FsDatasetImpl#finalizeBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public synchronized void finalizeBlock(ExtendedBlock b) throws IOException { if (Thread.interrupted()) { throw new IOException("Cannot finalize block from Interrupted Thread"); } ReplicaInfo replicaInfo = getReplicaInfo(b); if (replicaInfo.getState() == ReplicaState.FINALIZED) { return; } finalizeReplica(b.getBlockPoolId(), replicaInfo); } ... private synchronized FinalizedReplica finalizeReplica(String bpid, ReplicaInfo replicaInfo) throws IOException { FinalizedReplica newReplicaInfo = null; if (replicaInfo.getState() == ReplicaState.RUR && ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica().getState() == ReplicaState.FINALIZED) { newReplicaInfo = (FinalizedReplica) ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica(); } else { FsVolumeImpl v = (FsVolumeImpl)replicaInfo.getVolume(); File f = replicaInfo.getBlockFile(); if (v == null) { throw new IOException("No volume for temporary file " + f + " for block " + replicaInfo); } File dest = v.addFinalizedBlock( bpid, replicaInfo, f, replicaInfo.getBytesReserved()); newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); ... } volumeMap.add(bpid, newReplicaInfo); return newReplicaInfo; }
|
FsVolumeImpl#addFinalizedBlock():
1 2 3 4 5 6
| File addFinalizedBlock(String bpid, Block b, File f, long bytesReservedForRbw) throws IOException { releaseReservedSpace(bytesReservedForRbw); return getBlockPoolSlice(bpid).addBlock(b, f); }
|
还记得datanode启动过程中分析的FsVolumeImpl与BlockPoolSlice的关系吗?此处将操作继续委托给BlockPoolSlice#addBlock():
可知,BlockPoolSlice仅管理处于FINALIZED的数据块。
1 2 3 4 5 6 7 8 9 10 11
| File addBlock(Block b, File f) throws IOException { File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); if (!blockDir.exists()) { if (!blockDir.mkdirs()) { throw new IOException("Failed to mkdirs " + blockDir); } } File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); ... return blockFile; }
|
BlockPoolSlice反向借助FsDatasetImpl提供的静态方法FsDatasetImpl.moveBlockFiles():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| static File moveBlockFiles(Block b, File srcfile, File destdir) throws IOException { final File dstfile = new File(destdir, b.getBlockName()); final File srcmeta = FsDatasetUtil.getMetaFile(srcfile, b.getGenerationStamp()); final File dstmeta = FsDatasetUtil.getMetaFile(dstfile, b.getGenerationStamp()); try { NativeIO.renameTo(srcmeta, dstmeta); } catch (IOException e) { throw new IOException("Failed to move meta file for " + b + " from " + srcmeta + " to " + dstmeta, e); } try { NativeIO.renameTo(srcfile, dstfile); } catch (IOException e) { throw new IOException("Failed to move block file for " + b + " from " + srcfile + " to " + dstfile.getAbsolutePath(), e); } ... return dstfile; }
|
直接将block文件和meta文件从原目录(rbw目录,对应RBW状态)移动到finalized目录(对应FINALIZED状态)。
至此,datanode上的写数据块已经完成。
不过,namenode上的元信息还没有更新,因此,还要向namenode汇报收到了数据块。
- 线程安全由FsDatasetImpl#finalizeReplica()保证
- 整个FsDatasetImpl#finalizeReplica()的流程中,都不关系数据块的原位置,状态转换逻辑本身保证了其正确性。
namenode角度的数据块关闭:Datanode#closeBlock()
Datanode#closeBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { metrics.incrBlocksWritten(); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); if(bpos != null) { bpos.notifyNamenodeReceivedBlock(block, delHint, storageUuid); } else { LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); } FsVolumeSpi volume = getFSDataset().getVolume(block); if (blockScanner != null && !volume.isTransientStorage()) { blockScanner.addBlock(block); } }
|
BPOfferService#notifyNamenodeReceivedBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void notifyNamenodeReceivedBlock( ExtendedBlock block, String delHint, String storageUuid) { checkBlock(block); ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, delHint); for (BPServiceActor actor : bpServices) { actor.notifyNamenodeBlock(bInfo, storageUuid, true); } }
|
BPServiceActor#notifyNamenodeBlock():
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void notifyNamenodeBlock(ReceivedDeletedBlockInfo bInfo, String storageUuid, boolean now) { synchronized (pendingIncrementalBRperStorage) { addPendingReplicationBlockInfo( bInfo, dn.getFSDataset().getStorage(storageUuid)); sendImmediateIBR = true; if (now) { pendingIncrementalBRperStorage.notifyAll(); } } }
|
该方法的核心是pendingIncrementalBRperStorage,它维护了两次汇报之间收到、删除的数据块。pendingIncrementalBRperStorage是一个缓冲区,此处将收到的数据块放入缓冲区后即认为通知完成(当然,不一定成功);由其他线程读取缓冲区,异步向namenode汇报。
猴子看的源码比较少,但这种缓冲区的设计思想在HDFS和Yarn中非常常见。缓冲区实现了解耦,解耦不仅能提高可扩展性,还能在缓冲区两端使用不同的处理速度、处理规模。如pendingIncrementalBRperStorage,生产者不定期、零散放入的数据块,消费者就可以定期、批量的对数据块进行处理。而保障一定及时性的前提下,批量汇报减轻了RPC的压力。
利用IDE,很容易得知,只有负责向各namenode发送心跳的BPServiceActor线程阻塞在pendingIncrementalBRperStorage上。后文将分析该线程如何进行实际的汇报。
PacketResponder#close()
根据对BlockReceiver#receivePacket()与PacketResponder线程的分析,节点已接收所有packet时,ack可能还没有发送完。
因此,需要调用PacketResponder#close(),等待发送完所有ack后关闭responder:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public void close() { synchronized(ackQueue) { while (isRunning() && ackQueue.size() != 0) { try { ackQueue.wait(); } catch (InterruptedException e) { running = false; Thread.currentThread().interrupt(); } } if(LOG.isDebugEnabled()) { LOG.debug(myString + ": closing"); } running = false; ackQueue.notifyAll(); } synchronized(this) { running = false; notifyAll(); } }
|
猴子没明白21-24行的synchronized语句块有什么用,,,求解释。
BPServiceActor线程
根据前文,接下来需要分析BPServiceActor线程如何读取pendingIncrementalBRperStorage缓冲区,进行实际的汇报。
在BPServiceActor#offerService()中调用了pendingIncrementalBRperStorage#wait()。由于涉及阻塞、唤醒等操作,无法按照正常流程分析,这里从线程被唤醒的位置开始分析:
1 2 3 4 5 6 7 8 9 10 11 12 13
| long waitTime = dnConf.heartBeatInterval - (Time.now() - lastHeartbeat); synchronized(pendingIncrementalBRperStorage) { if (waitTime > 0 && !sendImmediateIBR) { try { pendingIncrementalBRperStorage.wait(waitTime); } catch (InterruptedException ie) { LOG.warn("BPOfferService for " + this + " interrupted"); } } }
|
可能有读者阅读过猴子的条件队列大法好:使用wait、notify和notifyAll的正确姿势,认为此处if(){wait}
的写法姿势不正确。读者可再复习一下该文的“version2:过早唤醒”部分,结合HDFS的心跳机制,思考一下为什么此处的写法没有问题。更甚,此处恰恰应当这么写。
如果目前不需要汇报,则BPServiceActor线程会wait一段时间,正是这段wait的时间,让BPServiceActor#notifyNamenodeBlock()的唤醒产生了意义。
BPServiceActor线程被唤醒后,继续心跳循环:
1 2 3 4
| while (shouldRun()) { try { final long startTime = now(); if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
|
假设还到达心跳发送间隔,则不执行if语句块。
此时,在BPServiceActor#notifyNamenodeBlock()方法中修改的volatile变量sendImmediateIBR就派上了用场:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| if (sendImmediateIBR || (startTime - lastDeletedReport > dnConf.deleteReportInterval)) { reportReceivedDeletedBlocks(); lastDeletedReport = startTime; } List<DatanodeCommand> cmds = blockReport(); processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd });
|
有意思的是,这里先单独汇报了一次数据块收到和删除的情况,该RPC不需要等待namenode的返回值;又汇报了一次总体情况,此时需要等待RPC的返回值了。
因此,尽管对于增删数据块采取增量式汇报,但由于增量式汇报后必然跟着一次全量汇报,使得增量汇报的成本仍然非常高。
为了提高并发,BPServiceActor#notifyNamenodeBlock修改缓冲区后立即返回,不必关心汇报是否成功。也不必担心汇报失败的后果,必然能保证一致性:在汇报之前,数据块已经转为FINALIZED状态+持久化到磁盘上+修改了缓冲区,如果“数据块增加”汇报失败可以等待下次心跳重试,如果“数据块删除”汇报失败无所谓(Namenode上早就删除了该数据块的元信息),如果datanode在发报告前挂了可以等启动后重新汇报“数据块增加”,不需汇报“数据块删除”。
暂时不关心总体汇报的逻辑,只看单独汇报的BPServiceActor#reportReceivedDeletedBlocks():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private void reportReceivedDeletedBlocks() throws IOException { ArrayList<StorageReceivedDeletedBlocks> reports = new ArrayList<StorageReceivedDeletedBlocks>(pendingIncrementalBRperStorage.size()); synchronized (pendingIncrementalBRperStorage) { for (Map.Entry<DatanodeStorage, PerStoragePendingIncrementalBR> entry : pendingIncrementalBRperStorage.entrySet()) { final DatanodeStorage storage = entry.getKey(); final PerStoragePendingIncrementalBR perStorageMap = entry.getValue(); if (perStorageMap.getBlockInfoCount() > 0) { ReceivedDeletedBlockInfo[] rdbi = perStorageMap.dequeueBlockInfos(); reports.add(new StorageReceivedDeletedBlocks(storage, rdbi)); } } sendImmediateIBR = false; } if (reports.size() == 0) { return; } boolean success = false; try { bpNamenode.blockReceivedAndDeleted(bpRegistration, bpos.getBlockPoolId(), reports.toArray(new StorageReceivedDeletedBlocks[reports.size()])); success = true; } finally { if (!success) { synchronized (pendingIncrementalBRperStorage) { for (StorageReceivedDeletedBlocks report : reports) { PerStoragePendingIncrementalBR perStorageMap = pendingIncrementalBRperStorage.get(report.getStorage()); perStorageMap.putMissingBlockInfos(report.getBlocks()); sendImmediateIBR = true; } } } } }
|
有两个注意点:
- 不管namenode处于active或standy状态,BPServiceActor线程都会汇报(尽管会忽略standby namenode的命令)
- 最后success为false时,可能namenode已收到汇报,但将信息添加回缓冲区导致重复汇报也没有坏影响:
- 如果重复汇报已删除的数据块:namenode发现未存储该数据块的信息,则得知其已经删除了,会忽略该信息。
- 如果重复汇报已收到的数据块:namenode发现新收到的数据块与已存储数据块的信息完全一致,也会忽略该信息。
总结
1个客户端+1个datanode构成了最小的管道。本文梳理了在这个最小管道上无异常情况下的写数据块过程,在此之上,再来分析管道写的有异常的难度将大大降低。